使用 Clojure 编写 Hive UDF

Table of Contents

在 Hive 中可以用调用外部脚本的方式处理数据,用外部脚本的好处是不受语言限制,通常情况下我们用Python写一些外部脚本来处理一些数据,但是有些情况下写成 UDF(User-defined function)可以简化查询语句,也能更好地使用条件判断。

Hive 是用 Java 开发的,不过任何运行在 JVM 上的语言,只要支持和 Java 交互就可以写 UDF,选 Clojure 的原因很简单,因为它是 Lisp 方言,也能很好地和 Java 代码交互,并且开发效率很高。

Hive 的自定义函数分了三种:UDTF、UDAF 和 UDF,前两种我基本上用不上。

UDF 编写很简单,只要保证 Hive 能调用你暴露的 evaluate 函数(方法)即可,同时满足:

  1. 继承 org.apache.hadoop.hive.ql.exec.UDF;
  2. 调用 UDF 时,保证传入的参数和返回值为 org.apache.hadoop.io.Text 类型(参数和返回值也可以是 String 类型)

本文为 Hive 写一个 upper 函数,将字母转成大写(虽然很无聊,但因为代码量很少,容易让人看懂)

1. 使用 Leiningen 创建项目

关于 Leiningen 是什么、怎么安装,这里不多说了。先创建个项目:

$ lein new upper

然后进入 upper 项目,修改 project.clj,如下:

(defproject upper "0.1.0-SNAPSHOT"
    :description "FIXME: write description"
    :url "http://example.com/FIXME"
    :license {:name "Eclipse Public License"
    :url "http://www.eclipse.org/legal/epl-v10.html"}
    :plugins [[cider/cider-nrepl "0.8.0-SNAPSHOT"]]
    :dependencies [[org.clojure/clojure "1.5.1"]
		   [hive/hive-exec "0.5.0"]
		   [org.apache.hadoop/hadoop-core "0.20.2-dev"]])

主要改动:

1、增加 Hadoop 和 Hive 的依赖

2、安装 cider/cider-nrepl 插件,便于和 Cider 交互开发

然后自动安装依赖:

$ lein deps

2. 编写 UDF

编辑 src/upper/core.clj,详细请看代码注释:

(ns upper.core
  (:import [org.apache.hadoop.hive.ql.exec UDF])
  (:import [org.apache.hadoop.io Text])
  (:gen-class
   :name upper.core
   ;; 继承 org.apache.hadoop.hive.ql.exec.UDF,必须的
   :extends org.apache.hadoop.hive.ql.exec.UDF
   ;; 暴露 evaluate 方法给 Hive,同时声明参数类型和返回值类型
   ;; 列表里是参数类型,参数数目必须和列表数目对应
   ;; 参数和返回值都必须是 org.apache.hadoop.io.Text 类型
   :methods [[evaluate [org.apache.hadoop.io.Text] org.apache.hadoop.io.Text]]))

(defn upper [string]
  (.toUpperCase string))

;; #^Text 是元数据,声明函数返回的是 Text 类型
(defn #^Text -evaluate
  ;; 声明参数为 Text 类型
  [this #^Text string]
  ;; 先将参数值转成字符串类型处理后,再包装成 Text 类型
  (Text. (upper (.toString string))))

这里要注意一个细节,新建 Text 实例时不能给类型为 nil(对应 Java 中的 null)的参数,所以必须保证 upper 函数调用结果是字符串,否则会异常。

3. 编译成 class 文件并打包

首先,需要修改 project.clj,增加 :aot 选项:

:aot [upper.core]

然后,将代码编译成 class。编译时一定要注意 JDK 版本,如果 Hive 用的 Java6,就必须用 Java6 编译:

$ lein compile Compiling upper.core

最后,如果编译顺利通过,再将它打包成 jar 文件:

$ lein uberjar Created /tmp/upper/target/upper-0.1.0-SNAPSHOT.jar Created /tmp/upper/target/upper-0.1.0-SNAPSHOT-standalone.jar

这时会在 target 目录下生成两个 .jar 文件,其中以“-standalone.jar”结尾的 jar 文比较大,应该有 17M 左右,因为它包含了所有的依赖包括 Clojure 自身),所以可以直接放 Hive 上使用。

4. 在 Hive 中调用

为了方便测试,新建一个 hive.sql 文件,把 HiveQL 语句写进去:

-- 将生成的 jar 文件添加到 Hive 中,它会自动分发给其他节点
add jar target/upper-0.1.0-SNAPSHOT-standalone.jar;
-- 注册成 Hive 函数
create temporary function my_upper as 'upper.core';

select my_upper(en_name) from user_information;

然后执行:

$ hive -f hive.sql

如果顺利的话,Hive 将正常返回调用结果。

5. 关于调试

开发 UDF 难免会遇到代码错误,可能是编译中出现错误,也可能是运行时出错。编译中出现错误一般就是语法一类的问题,很好解决;但如果是在 Hive 运行时出错,MapReduce 任务会被杀死,调试也会变得比较难,所以在这之前建议写好单元测试,保证各个部分的代码稳定后再放到 Hive 中。如果在 Hive 运行中死掉,可以到 JobTracker 的 Web 监控页面看调试日志中的 Java 异常信息。

6. 关于性能优化

工作中我主要是写 UDF 处理海量日志,在写某个大量匹配功能的 UDF 时,一千万条日志花了 4 小时。但日志里要匹配的字段有大量是重复的,所以我用了 memoize 函数缓存结果,这时一千万条日志只跑了不到半个小时。如果你处理的数据中如果有比较多重复数据的话,建议使用 memoize。

另外,在大规模数据处理下,一些代码细节优化对速度提升很不明显,建议仔细揣摩代码,优先优化流程。像上面说的场景,我在优化了数据处理流程的情况下,最后只花几分钟就可以处理完一千万条日志了。